﻿<!DOCTYPE html>
<html>

<head>
  <meta charset="utf-8">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <title>SpringBoot(26) 整合 Kafka</title>
  <link rel="stylesheet" href="https://stackedit.io/style.css" />
</head>

<body class="stackedit">
  <div class="stackedit__html"><h3><a id="_0"></a>一、前言</h3>
<h4><a id="Kafka_2"></a>Kafka是什么？</h4>
<ol>
<li>Kafka是一种<code>高吞吐量</code>的<code>分布式</code>发布订阅<code>消息系统</code>，可以处理消费者在网站中的所有动作流数据。</li>
<li>目的：通过Hadoop的并行加载机制来统一线上和离线的消息处理，也是为了通过集群来提供实时的消息。</li>
</ol>
<h4><a id="_7"></a>环境</h4>
<ol>
<li>springboot</li>
<li>idea</li>
<li>docker-compose</li>
<li>zookeeper</li>
<li>kafka</li>
<li>kafka-manager</li>
</ol>
<h3><a id="dockercomposekafka_17"></a>二、docker-compose安装kafka</h3>
<h4><a id="1_dockercomposekafkayml_19"></a>1. docker-compose-kafka.yml</h4>
<pre><code class="prism language-yml"><span class="token key atrule">version</span><span class="token punctuation">:</span> <span class="token string">'3'</span>
<span class="token key atrule">services</span><span class="token punctuation">:</span>
  <span class="token key atrule">zookepper</span><span class="token punctuation">:</span>
    <span class="token key atrule">image</span><span class="token punctuation">:</span> wurstmeister/zookeeper                    <span class="token comment"># 原镜像`wurstmeister/zookeeper`</span>
    <span class="token key atrule">container_name</span><span class="token punctuation">:</span> zookeeper_server                 <span class="token comment"># 容器名为'zookeeper_server'</span>
    <span class="token key atrule">restart</span><span class="token punctuation">:</span> always                                  <span class="token comment"># 指定容器退出后的重启策略为始终重启</span>
    <span class="token key atrule">volumes</span><span class="token punctuation">:</span>                                         <span class="token comment"># 数据卷挂载路径设置,将本机目录映射到容器目录</span>
      <span class="token punctuation">-</span> <span class="token string">"/etc/localtime:/etc/localtime"</span>
    <span class="token key atrule">ports</span><span class="token punctuation">:</span>                                           <span class="token comment"># 映射端口</span>
      <span class="token punctuation">-</span> <span class="token string">"2181:2181"</span>

  <span class="token key atrule">kafka</span><span class="token punctuation">:</span>
    <span class="token key atrule">image</span><span class="token punctuation">:</span> wurstmeister/kafka                                <span class="token comment"># 原镜像`wurstmeister/kafka`</span>
    <span class="token key atrule">container_name</span><span class="token punctuation">:</span> kafka_server                             <span class="token comment"># 容器名为'kafka_server'</span>
    <span class="token key atrule">restart</span><span class="token punctuation">:</span> always                                          <span class="token comment"># 指定容器退出后的重启策略为始终重启</span>
    <span class="token key atrule">volumes</span><span class="token punctuation">:</span>                                                 <span class="token comment"># 数据卷挂载路径设置,将本机目录映射到容器目录</span>
      <span class="token punctuation">-</span> <span class="token string">"/etc/localtime:/etc/localtime"</span>
    <span class="token key atrule">environment</span><span class="token punctuation">:</span>                        <span class="token comment"># 设置环境变量,相当于docker run命令中的-e</span>
      <span class="token key atrule">KAFKA_ADVERTISED_HOST_NAME</span><span class="token punctuation">:</span> www.zhengqingya.com  <span class="token comment"># TODO 本机IP</span>
      <span class="token key atrule">KAFKA_ADVERTISED_PORT</span><span class="token punctuation">:</span> <span class="token number">9092                      </span><span class="token comment"># 端口</span>
      <span class="token key atrule">KAFKA_BROKER_ID</span><span class="token punctuation">:</span> <span class="token number">0                </span><span class="token comment"># 在kafka集群中，每个kafka都有一个BROKER_ID来区分自己</span>
      <span class="token key atrule">KAFKA_ADVERTISED_LISTENERS</span><span class="token punctuation">:</span> PLAINTEXT<span class="token punctuation">:</span>//www.zhengqingya.com<span class="token punctuation">:</span><span class="token number">9092 </span><span class="token comment"># TODO 将kafka的地址端口注册给zookeeper</span>
      <span class="token key atrule">KAFKA_LISTENERS</span><span class="token punctuation">:</span> PLAINTEXT<span class="token punctuation">:</span>//0.0.0.0<span class="token punctuation">:</span><span class="token number">9092        </span><span class="token comment"># 配置kafka的监听端口</span>
      <span class="token key atrule">KAFKA_ZOOKEEPER_CONNECT</span><span class="token punctuation">:</span> www.zhengqingya.com<span class="token punctuation">:</span><span class="token number">2181 </span><span class="token comment"># TODO zookeeper地址</span>
      <span class="token key atrule">KAFKA_CREATE_TOPICS</span><span class="token punctuation">:</span> <span class="token string">"hello_world"</span>
    <span class="token key atrule">ports</span><span class="token punctuation">:</span>                              <span class="token comment"># 映射端口</span>
      <span class="token punctuation">-</span> <span class="token string">"9092:9092"</span>
    <span class="token key atrule">depends_on</span><span class="token punctuation">:</span>                         <span class="token comment"># 解决容器依赖启动先后问题</span>
      <span class="token punctuation">-</span> zookepper

  <span class="token key atrule">kafka-manager</span><span class="token punctuation">:</span>
    <span class="token key atrule">image</span><span class="token punctuation">:</span> sheepkiller/kafka<span class="token punctuation">-</span>manager                         <span class="token comment"># 原镜像`sheepkiller/kafka-manager`</span>
    <span class="token key atrule">container_name</span><span class="token punctuation">:</span> kafka<span class="token punctuation">-</span>manager                            <span class="token comment"># 容器名为'kafka-manager'</span>
    <span class="token key atrule">restart</span><span class="token punctuation">:</span> always                                          <span class="token comment"># 指定容器退出后的重启策略为始终重启</span>
    <span class="token key atrule">environment</span><span class="token punctuation">:</span>                        <span class="token comment"># 设置环境变量,相当于docker run命令中的-e</span>
      <span class="token key atrule">ZK_HOSTS</span><span class="token punctuation">:</span> www.zhengqingya.com<span class="token punctuation">:</span><span class="token number">2181  </span><span class="token comment"># TODO zookeeper地址</span>
      <span class="token key atrule">APPLICATION_SECRET</span><span class="token punctuation">:</span> zhengqing
      <span class="token key atrule">KAFKA_MANAGER_AUTH_ENABLED</span><span class="token punctuation">:</span> "true"  <span class="token comment"># 开启kafka-manager权限校验</span>
      <span class="token key atrule">KAFKA_MANAGER_USERNAME</span><span class="token punctuation">:</span> admin       <span class="token comment"># 登陆账户</span>
      <span class="token key atrule">KAFKA_MANAGER_PASSWORD</span><span class="token punctuation">:</span> <span class="token number">123456      </span><span class="token comment"># 登陆密码</span>
    <span class="token key atrule">ports</span><span class="token punctuation">:</span>                              <span class="token comment"># 映射端口</span>
      <span class="token punctuation">-</span> <span class="token string">"9001:9000"</span>
    <span class="token key atrule">depends_on</span><span class="token punctuation">:</span>                         <span class="token comment"># 解决容器依赖启动先后问题</span>
      <span class="token punctuation">-</span> kafka
</code></pre>
<h4><a id="2__69"></a>2. 运行</h4>
<pre><code class="prism language-shell">docker-compose -f docker-compose-kafka.yml -p kafka up -d
</code></pre>
<p><img src="https://img-blog.csdnimg.cn/20200421234825103.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM4MjI1NTU4,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>
<img src="https://img-blog.csdnimg.cn/20200421234914526.png" alt="在这里插入图片描述"></p>
<h4><a id="3_kafkamanagerkafka_78"></a>3. <code>kafka-manager</code>(kafka集群管理工具)</h4>
<p>访问<a href="http://www.zhengqingya.com:9001/"><code>ip:9001</code></a></p>
<blockquote>
<p>温馨小提示：下面只是简单使用，更多可自行查询资料了解</p>
</blockquote>
<h6><a id="_Cluster_Cluster__Add_Cluster_84"></a>① 新建Cluster: 点击<code>Cluster</code> -&gt; <code>Add Cluster</code></h6>
<p><img src="https://img-blog.csdnimg.cn/20200421221632917.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM4MjI1NTU4,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>
<img src="https://img-blog.csdnimg.cn/20200421235613333.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM4MjI1NTU4,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>
Save保存时出现如下提示至少为2，修改一下默认值为2即可~<br>
<img src="https://img-blog.csdnimg.cn/20200421222702691.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM4MjI1NTU4,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"></p>
<h6><a id="_topic_90"></a>② 查看topic</h6>
<p><img src="https://img-blog.csdnimg.cn/20200421235705960.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM4MjI1NTU4,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>
<img src="https://img-blog.csdnimg.cn/2020042123572469.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM4MjI1NTU4,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>
<img src="https://img-blog.csdnimg.cn/20200422000051554.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM4MjI1NTU4,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"></p>
<h3><a id="SpringBootKafka_96"></a>三、SpringBoot整合Kafka</h3>
<h4><a id="_pomxml_98"></a>① <code>pom.xml</code>中引入依赖</h4>
<pre><code class="prism language-java"><span class="token generics function"><span class="token punctuation">&lt;</span>dependency<span class="token punctuation">&gt;</span></span>
    <span class="token generics function"><span class="token punctuation">&lt;</span>groupId<span class="token punctuation">&gt;</span></span>org<span class="token punctuation">.</span>springframework<span class="token punctuation">.</span>kafka<span class="token operator">&lt;</span><span class="token operator">/</span>groupId<span class="token operator">&gt;</span>
    <span class="token generics function"><span class="token punctuation">&lt;</span>artifactId<span class="token punctuation">&gt;</span></span>spring<span class="token operator">-</span>kafka<span class="token operator">&lt;</span><span class="token operator">/</span>artifactId<span class="token operator">&gt;</span>
<span class="token operator">&lt;</span><span class="token operator">/</span>dependency<span class="token operator">&gt;</span>
</code></pre>
<h4><a id="_applicationymlkafka_107"></a>② <code>application.yml</code>中配置kafka</h4>
<pre><code class="prism language-yml"><span class="token key atrule">spring</span><span class="token punctuation">:</span>
  <span class="token comment"># ======================== ↓↓↓↓↓↓ kafka相关配置 ↓↓↓↓↓↓ ===============================</span>
  <span class="token key atrule">kafka</span><span class="token punctuation">:</span>
    <span class="token key atrule">bootstrap-servers</span><span class="token punctuation">:</span> www.zhengqingya.com<span class="token punctuation">:</span><span class="token number">9092 </span><span class="token comment"># 指定kafka server地址，集群（多个逗号分隔）</span>
    <span class="token key atrule">producer</span><span class="token punctuation">:</span>
      <span class="token comment"># 指定消息key和消息体的编解码方式</span>
      <span class="token key atrule">key-serializer</span><span class="token punctuation">:</span> org.apache.kafka.common.serialization.StringSerializer
      <span class="token key atrule">value-serializer</span><span class="token punctuation">:</span> org.apache.kafka.common.serialization.StringSerializer
      <span class="token comment"># 写入失败时，重试次数。当leader节点失效，一个repli节点会替代成为leader节点，此时可能出现写入失败，</span>
      <span class="token comment"># 当retris为0时，produce不会重复。retirs重发，此时repli节点完全成为leader节点，不会产生消息丢失。</span>
      <span class="token key atrule">retries</span><span class="token punctuation">:</span> <span class="token number">0</span>
      <span class="token comment"># 每次批量发送消息的数量,produce积累到一定数据，一次发送</span>
      <span class="token key atrule">batch-size</span><span class="token punctuation">:</span> <span class="token number">16384</span>
      <span class="token comment"># produce积累数据一次发送，缓存大小达到buffer.memory就发送数据</span>
      <span class="token key atrule">buffer-memory</span><span class="token punctuation">:</span> <span class="token number">33554432</span>
    <span class="token key atrule">consumer</span><span class="token punctuation">:</span>
      <span class="token key atrule">group-id</span><span class="token punctuation">:</span> default_consumer_group <span class="token comment"># 指定默认消费者 群组ID</span>
      <span class="token key atrule">enable-auto-commit</span><span class="token punctuation">:</span> <span class="token boolean important">true</span>
      <span class="token key atrule">auto-commit-interval</span><span class="token punctuation">:</span> <span class="token number">1000</span>
      <span class="token comment"># procedure要求leader在考虑完成请求之前收到的确认数，用于控制发送记录在服务端的持久化，其值可以为如下：</span>
      <span class="token comment"># acks = 0 如果设置为零，则生产者将不会等待来自服务器的任何确认，该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下，无法保证服务器已收到记录，并且重试配置将不会生效（因为客户端通常不会知道任何故障），为每条记录返回的偏移量始终设置为-1。</span>
      <span class="token comment"># acks = 1 这意味着leader会将记录写入其本地日志，但无需等待所有副本服务器的完全确认即可做出回应，在这种情况下，如果leader在确认记录后立即失败，但在将数据复制到所有的副本服务器之前，则记录将会丢失。</span>
      <span class="token comment"># acks = all 这意味着leader将等待完整的同步副本集以确认记录，这保证了只要至少一个同步副本服务器仍然存活，记录就不会丢失，这是最强有力的保证，这相当于acks = -1的设置。</span>
      <span class="token comment"># 可以设置的值为：all, -1, 0, 1</span>
      <span class="token key atrule">acks</span><span class="token punctuation">:</span> <span class="token number">1</span>
      <span class="token comment"># 指定消息key和消息体的编解码方式</span>
      <span class="token key atrule">key-deserializer</span><span class="token punctuation">:</span> org.apache.kafka.common.serialization.StringDeserializer
      <span class="token key atrule">value-deserializer</span><span class="token punctuation">:</span> org.apache.kafka.common.serialization.StringDeserializer
</code></pre>
<h4><a id="____140"></a>③ 生产者 - 发送消息</h4>
<pre><code class="prism language-java"><span class="token annotation punctuation">@RestController</span>
<span class="token annotation punctuation">@RequestMapping</span><span class="token punctuation">(</span><span class="token string">"/api/"</span><span class="token punctuation">)</span>
<span class="token keyword">public</span> <span class="token keyword">class</span> <span class="token class-name">Producer</span> <span class="token punctuation">{</span>
    <span class="token annotation punctuation">@Autowired</span>
    <span class="token keyword">private</span> KafkaTemplate<span class="token generics function"><span class="token punctuation">&lt;</span>String<span class="token punctuation">,</span> Object<span class="token punctuation">&gt;</span></span> kafkaTemplate<span class="token punctuation">;</span>

    <span class="token annotation punctuation">@RequestMapping</span><span class="token punctuation">(</span><span class="token string">"send"</span><span class="token punctuation">)</span>
    <span class="token keyword">public</span> String <span class="token function">send</span><span class="token punctuation">(</span>String msg<span class="token punctuation">)</span> <span class="token punctuation">{</span>
        kafkaTemplate<span class="token punctuation">.</span><span class="token function">send</span><span class="token punctuation">(</span><span class="token string">"hello"</span><span class="token punctuation">,</span> msg<span class="token punctuation">)</span><span class="token punctuation">;</span>
        <span class="token keyword">return</span> <span class="token string">"SUCCESS"</span><span class="token punctuation">;</span>
    <span class="token punctuation">}</span>
<span class="token punctuation">}</span>
</code></pre>
<h4><a id="____157"></a>④ 消费者 - 接收消息</h4>
<pre><code class="prism language-java"><span class="token annotation punctuation">@Slf4j</span>
<span class="token annotation punctuation">@Component</span>
<span class="token keyword">public</span> <span class="token keyword">class</span> <span class="token class-name">Consumer</span> <span class="token punctuation">{</span>
    <span class="token annotation punctuation">@KafkaListener</span><span class="token punctuation">(</span>topics <span class="token operator">=</span> <span class="token string">"hello"</span><span class="token punctuation">)</span>
    <span class="token keyword">public</span> <span class="token keyword">void</span> <span class="token function">listen</span><span class="token punctuation">(</span>ConsumerRecord<span class="token operator">&lt;</span><span class="token operator">?</span><span class="token punctuation">,</span> <span class="token operator">?</span><span class="token operator">&gt;</span> record<span class="token punctuation">)</span> <span class="token punctuation">{</span>
        log<span class="token punctuation">.</span><span class="token function">info</span><span class="token punctuation">(</span><span class="token string">"topic: "</span> <span class="token operator">+</span> record<span class="token punctuation">.</span><span class="token function">topic</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token operator">+</span> <span class="token string">"  &lt;|============|&gt;  消息内容："</span> <span class="token operator">+</span> record<span class="token punctuation">.</span><span class="token function">value</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
    <span class="token punctuation">}</span>
<span class="token punctuation">}</span>
</code></pre>
<h3><a id="_171"></a>四、测试</h3>
<p>① 调用接口发送消息：<a href="http://127.0.0.1/api/send?msg=hello,kafka">http://127.0.0.1/api/send?msg=hello,kafka</a></p>
<p>② 查看控制台打印日志信息：<br>
<img src="https://img-blog.csdnimg.cn/20200421233248635.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM4MjI1NTU4,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"></p>
<hr>
<h3><a id="demo_182"></a>本文案例demo源码</h3>
<p><a href="https://gitee.com/zhengqingya/java-workspace">https://gitee.com/zhengqingya/java-workspace</a></p>
</div>
</body>

</html>
